Logo
Published on

1.1.FastAPI 快速上手

Authors
  • avatar
    Name
    xiaobai
    Twitter

1.概念介绍

1.1.WSGI 简介

  • WSGI,全名 Web Server Gateway Interface(服务器网关接口);
  • 2003 年提出,用于描述同步的 Python Web 应用(如 Django、Flask)。
  • 同步阻塞模型:一个请求必须完全处理完成后才能处理下一个请求。

1.2.ASGI 简介

  • ASGI,全名 Asynchronous Server Gateway Interface(异步服务器网管接口);
  • 是WSGI的扩展版本,旨在为Python Web服务、框架和应用之间提供一个标准的异步接口。
  • 其本身可以提供同步和异步应用,并且可以并行处理。还能处理多种通用协议,包括HTTP,HTTP2和WebSocket。
  • 同WSGI一样,需要有独立的服务器实现这种异步的网关接口,比如Daphne、Uvicorn、Hypercorn等;

1.3.Starlette简介

官方文档:https://www.starlette.io/

Starlette 是一个轻量级的 ASGI(Asynchronous Server Gateway Interface)框架,专为构建异步 Web 应用设计。它是 FastAPI 的底层框架,提供了路由、请求处理、中间件、WebSocket 支持等核心功能。Starlette 的设计目标是简单、高效,同时保持足够的灵活性,适用于各种 Web 开发场景。拥有以下特性:

  1. 异步支持:基于 Python 的 asyncio,支持异步 I/O 操作,适合高并发场景。
  2. 路由系统:提供直观的路径操作装饰器,用于定义 API 端点。
  3. 请求与响应处理:内置对 HTTP 请求和响应的支持,支持 JSON、表单数据等格式。
  4. 中间件:允许开发者在请求处理前后添加自定义逻辑。
  5. WebSocket 支持:支持实时双向通信,适用于聊天应用等场景。

FastAPI 直接继承了 Starlette 的所有功能,例如路由、请求处理和中间件。FastAPI 的 @app.get()、@app.post() 等装饰器实际上是对 Starlette 路由系统的封装。换句话说,FastAPI 在 Starlette 的基础上增加了类型检查、自动文档生成等高级特性。

1.4.FastAPI 简介

官方文档:https://fastapi.tiangolo.com/zh/;

FastAPI 是一个用于构建 API 的现代、快速(高性能)的 web 框架,使用 Python 并基于标准的 Python 类型提示。拥有以下特性:

  1. 快速:可与 NodeJS 和 Go 并肩的极高性能(归功于 Starlette 和 Pydantic)。最快的 Python web 框架之一。
  2. 高效编码:提高功能开发速度约 200% 至 300%。
  3. 更少 bug:减少约 40% 的人为(开发者)导致错误。
  4. 智能:极佳的编辑器支持。处处皆可自动补全,减少调试时间。
  5. 简单:设计的易于使用和学习,阅读文档的时间更短。
  6. 简短:使代码重复最小化。通过不同的参数声明实现丰富功能。bug 更少。
  7. 健壮:生产可用级别的代码。还有自动生成的交互式文档。
  8. 标准化:基于(并完全兼容)API 的相关开放标准:OpenAPI (以前被称为 Swagger) 和 JSON Schema。
  9. 用于创建 API 的 OpenAPI 包含了路径操作,请求参数,请求体,安全性等的声明。 使用 JSON Schema (因为 OpenAPI 本身就是基于 JSON Schema 的)自动生成数据模型文档。 经过了缜密的研究后围绕这些标准而设计。并非狗尾续貂。 这也允许了在很多语言中自动生成客户端代码。

1.5.Uvicorn 简介

官方文档:https://www.uvicorn.org/

  1. FastAPI 是一个现代化的高性能 Web 框架,它使用 Python 的异步编程特性来提高 Web 应用程序的性能。
  2. 而 Uvicorn 则是一个基于 uvloop 和 httptools 实现的高性能 ASGI 服务器,可以实现异步处理 HTTP 请求。
  3. FastAPI 使用 Uvicorn 作为其默认的 Web 服务器,是因为 Uvicorn 是一个非常快速、可靠且易于使用的 ASGI 服务器,可以在处理大量并发连接时保持稳定和高效。
  4. 此外,Uvicorn 还支持 WebSocket 和 HTTP/2 等新特性,符合 FastAPI 提倡的现代 Web 开发理念。因此,使用 Uvicorn 作为 FastAPI 的 Web 服务器是一个很好的选择。

1.6.Gnuicorn 简介

1.6.1.简介

官方文档:https://gunicorn.org/

  1. Gunicorn是一款高性能的PythonWSGIHTTP服务器,具有轻量级资源消耗和高并发处理能力。
  2. 这个名字源自 “Green Unicorn”(绿色独角兽)的缩写,所以也可以结合原意辅助记忆发音~
  3. Gunicorn 服务器作为wsgi app的容器,能够与各种Web框架兼容(flask,django等),得益于gevent等技术,使用Gunicorn能够在基本不改变wsgi app代码的前提下,大幅度提高wsgi app的性能。

1.6.2.Master-Worker 模型

Master 进程

  1. 负责管理 Worker 进程(启动、监控、重启)。
  2. 不处理请求,仅控制 Worker 生命周期。
  3. 监听信号(如 HUP 重载配置、TERM 优雅关闭)。

**Worker 进程: **

  1. 实际处理 HTTP 请求的进程。
  2. 支持多种 Worker 类型(同步、异步、线程等)。

同步 Workser(默认)

  1. 每个 Worker 一次处理一个请求。
  2. 使用操作系统进程(fork)实现并发。
  3. 适合 CPU 密集型任务(如机器学习推理)。

异步 Worker(gevent/eventlet)

  1. 基于协程(非阻塞 I/O),适合高并发 I/O 密集型场景(如数据库查询)。
  2. 通过猴子补丁(monkey-patching)替换标准库的阻塞调用。

Uvicorn Worker(ASGI 支持)

  1. 运行 FastAPI、Starlette 等 ASGI 应用。
  2. 结合了 Gunicorn 的进程管理和 Uvicorn 的异步性能。

2.请求响应

如下所示,我们接收一个参数,处理这个参数,然后将计算结果返回:

@app.get("/double")
async def double(num: int):
  return {"result": num * 2}

在这个例子中,我们使用装饰器“@app.get”监听get请求,请求路径为“/double”,处理这个请求的是异步函数“async def double”,接收一个查询参数num,返回结果是一个字典“{ result : str }”

3.接收参数

3.1.路由参数

  • 也叫做路径参数、动态路由,也就是说参数在请求路径中;
  • 参数在请求路径中的好处是,在检索或者浏览请求日志的时候,很方便地就能够根据这个关键的路径参数找到目标请求信息,方便后续追踪调试;
  • 关于路由参数请看官方文档:https://fastapi.tiangolo.com/zh/tutorial/path-params/

3.2.查询参数

  • 声明的参数不是路径参数时,路径操作函数会把该参数自动解释为查询参数。
  • 查询字符串是键值对的集合,这些键值对位于 URL 的 ? 之后,以 & 分隔。
  • 关于查询参数请看官方文档:https://fastapi.tiangolo.com/zh/tutorial/query-params/

3.3.请求体参数

  • 请求体是客户端发送给 API 的数据。响应体是 API 发送给客户端的数据。
  • API 基本上肯定要发送响应体,但是客户端不一定发送请求体。
  • 使用 Pydantic 模型声明请求体,能充分利用它的功能和优点。

3.4.参数规范

Restful API请求规范:

  1. 查询:(get)/module
  2. 新建:(post)/module
  3. 更新:(put)/module
  4. 删除:(dete)/module

RPC API请求规范:

  1. 查询:(post)/module/list
  2. 新建:(post)/module/insert
  3. 更新:(post)/module/update
  4. 删除:(post)/module/delete

4.

4.1.确认启动配置

先确认一下“server.py”中,使用uvicorn启动FastAPI的代码如下所示,没有设置worker参数,也就是默认使用一个进程运行Web服务:

uvicorn.run("app.server:app", host="0.0.0.0", port=port)

4.2.准备测试接口

接下来我们准备讲一下在异步接口中调用同步阻塞代码,导致整个服务被阻塞的问题;如下所示,我们准备一个同步阻塞的接口“sync_delay”,一个异步阻塞的接口“async_delay”,以及一个能够快速响应的接口“test”;

@app.get("/test")
async def test():
  print(f"Process {os.getpid()} handling /test")
  return {"message": "Hello World"}


@app.get("/sync_delay")
async def sync_delay(delay: int = 1):
  """同步延迟delay秒"""
  print(f"Process {os.getpid()} handling /sync_delay")
  time.sleep(delay)
  return {"hello": "world"}


@app.get("/async_delay")
async def async_delay(delay: int = 1):
  """异步延迟delay秒"""
  print(f"Process {os.getpid()} handling /async_delay")
  await asyncio.sleep(delay)
  return {"hello": "world"}
  • sync_delay:同步阻塞接口,可以通过查询参数delay来设置同步阻塞多少秒之后响应;
  • async_delay:异步阻塞接口,可以通过查询参数delay来设置异步阻塞多少秒之后响应;

准备完毕之后我们启动服务,此时我们调用“test”接口一般都是立即响应;

4.3.客户端测试代码

import Axios from "axios";
import {Button, Space} from "antd";

export default function () {

  /*调用test接口,打印消耗的时间*/
  async function request_test() {
    const startTime = Date.now();
    const resp = await Axios.get('http://127.0.0.1:7002/test');
    console.log('request_test result:', resp.data);
    console.log(`耗时:${((Date.now() - startTime) / 1000).toFixed(2)}s`);
  }

  /*调用doubao链*/
  async function request_doubao_endpoint() {
    const resp = await Axios.post(
      'http://127.0.0.1:7002/doubao/invoke',
      {
        input: { messages: [{ role: 'user', content: '写一个关于海洋的小作文,300字' }], },
        config: {},
        kwargs: {},
      }
    );
    console.log('request_doubao_endpoint result:', resp.data);
  }

  /*调用同步阻塞接口*/
  async function syncDelay(delay = 3) {
    const resp = await Axios.get(`http://127.0.0.1:7002/sync_delay?delay=${delay}`);
    console.log('syncDelay result:', resp.data);
  }

  /*调用异步阻塞接口*/
  async function asyncDelay() {
    const resp = await Axios.get('http://127.0.0.1:7002/async_delay?delay=3');
    console.log('asyncDelay result:', resp.data);
  }

  /*同时调用“同步阻塞接口”以及“test”接口*/
  async function syncDelayAndTest() {
    syncDelay();
    request_test();
  }

  /*同时调用“异步阻塞接口”以及“test”接口*/
  async function asyncDelayAndTest() {
    asyncDelay();
    request_test();
  }

  /*同时调用“豆包”接口以及test接口*/
  async function doubaoAndTest() {
    request_doubao_endpoint();
    request_test();
  }

  return (
    <div style={{ padding: '1em' }}>
      <Space>
        <Button onClick={request_test}>调用test接口</Button>
        <Button onClick={syncDelayAndTest}>请求接口:同步阻塞、test</Button>
        <Button onClick={asyncDelayAndTest}>请求接口:异步阻塞、test</Button>
        <Button onClick={doubaoAndTest}>请求接口:doubao、test</Button>
      </Space>
    </div>
  );
}

客户端渲染内容: img

说明:

  1. 这里我们一共定义四个请求函数:
    1. request_test:请求test接口,并且将请求的耗时打印出来;如果单独调用这个接口,那么响应速度很快。如果调用这个接口前调用了同步阻塞接口,那么这个接口的调用会被同步阻塞接口阻塞,等同步阻塞接口调用完毕之后才会响应;
    2. syncDelay:请求同步阻塞接口,默认阻塞3s;
    3. asyncDelay:请求异步阻塞接口,默认阻塞3s;
    4. request_doubao_endpoint:请求“doubao/invoke”接口,说明其是同步还是异步;
  2. 当立即调用同步接口以及test接口,也就是点击按钮“请求接口:同步阻塞、test”时,这个test接口需要等待3s才会响应;
  3. 当立即调用异步接口以及test接口,也就是点击按钮“请求接口:异步阻塞、test”时,这个test接口会立即响应;
  4. 当调用豆包接口以及test接口,也就是点击按钮“请求接口:doubao、test”时,这个test接口会立即响应,因为doubao这个端点下的“invoke、stream、batch”都是异步接口;

关于第4点,可以查看“add_route”的源码,我们找到“Anaconda/envs/langserve/Lib/site-packages/langserve/server.py:497”的这行代码,如下所示:

img

我们点进去这个“api_hander.invoke”:

img

可以看到,“invoke”端点最终调用的是“ainvoke”方法,也就是走的异步的逻辑;另外两个“batch”以及“stream”同样的道理;

4.4.用多进程来缓解同步阻塞

这里我们设置uvicorn的启动参数,worker为2,如下所示,意思是启动两个进程来处理请求,uvicorn有一套自己的规则来实现进程之间的请求负载均衡:

uvicorn.run("app.server:app", host="0.0.0.0", port=port, workers=2)

此时我们再执行“请求接口:同步阻塞、test”时,就会发现此时“test”接口已经能够立即响应了;但是如果我们更改一下这个函数的代码,如下所示:

/*同时调用“同步阻塞接口”以及“test”接口*/
async function syncDelayAndTest() {
  syncDelay(2);
  syncDelay(3);
  request_test();
}

我们先发起两个同步阻塞请求,再发送“test”请求,你会发现,此时test又被阻塞了。因为现在worker进程只有两个,分别被两个同步阻塞进程被占用了,于是test请求只能排队等待;

4.5.负载均衡失效问题

  • 上面的代码,如果你设置两个“syncDelay”的参数一样,比如都是2或者都是3,那么你会发现,此时“test”接口又能够立即响应了;
  • 原因在于Uvicorn 默认使用 操作系统级别的负载均衡(通过 SO_REUSEPORT 套接字选项),由操作系统决定将请求分配给哪个工作进程。但以下情况可能导致请求集中在同一个进程:
    • 浏览器或 HTTP 客户端(如 Axios)默认会复用 TCP 连接(HTTP Keep-Alive)。
    • 如果两个 syncDelay() 调用是快速连续发起的(例如在同一个事件循环中几乎同时触发),它们可能会复用同一个 TCP 连接,导致操作系统将请求路由到同一个工作进程。

下面我们验证这个同样参数的请求被路由到同一个进程的问题,客户端代码中,当两个syncDelay的参数一样时,比如都是3,FastAPI打印的日志如下所示:

Process 19884 handling /sync_delay
Process 18740 handling /test
INFO:     127.0.0.1:9869 - "GET /test HTTP/1.1" 200 OK
INFO:     127.0.0.1:9868 - "GET /sync_delay?delay=3 HTTP/1.1" 200 OK
Process 19884 handling /sync_delay
INFO:     127.0.0.1:9868 - "GET /sync_delay?delay=3 HTTP/1.1" 200 OK

可以看到,两次处理“/sync_delay”都是由同一个进程来处理的,这导致进程之间的负载均衡失效了;当两个syncDelay的参数不同时,FastAPI打印的日志如下所示:

Process 19884 handling /sync_delay
Process 18740 handling /sync_delay
INFO:     127.0.0.1:10066 - "GET /sync_delay?delay=2 HTTP/1.1" 200 OK
INFO:     127.0.0.1:10067 - "GET /sync_delay?delay=3 HTTP/1.1" 200 OK
Process 18740 handling /test
INFO:     127.0.0.1:10068 - "GET /test HTTP/1.1" 200 OK

可以看到,这时候两次“/sync_delay”被分给了不同的进程来处理了;这个是uvicorn本身的负载均衡策略,如果有需求更进一步优化,应该考虑使用nginx的负载均衡来实现,简单来说一般就是使用docker在不同的服务器主机上启动多个web服务,然后使用nginx负载均衡分发到这些服务,示例配置如下所示:

http {
    upstream backend {
        server backend1.example.com;
        server backend2.example.com;
        server backend3.example.com;
    }
    server {
        listen 80;
        location / {
            proxy_pass http://backend;
        }
    }
}

5.自定义实现异步端点

5.1.自定义流式接口

@app.get("/my_stream")
async def async_delay(start: int, end: int):
  async def generate_numbers():
    current = start
    while current <= end:
      yield json.dumps({"number": current}) + '\n'
      await asyncio.sleep(0.2)
      current += 1

  return StreamingResponse(generate_numbers(), media_type="application/x-ndjson")

5.2.自定义异步端点

如下所示,我们可以自定义实现一个将链部署为接口服务的工具函数“add_async_route”,调用这个工具函数可以为Runnable对象自动添加端点“ainvoke”、“abatch”、“astream”,并且自动生成文档:

import json
import time
from typing import List

from fastapi.responses import StreamingResponse
from fastapi import FastAPI, APIRouter
from langchain_core.messages import AIMessage
from langchain_core.runnables import Runnable


def format_ai_message(ai_message: AIMessage):
  return {
    "choices": [{
      "finish_reason": "stop",
      "index": 0,
      "message": {
        "content": ai_message.content,
        "role": "assistant"
      }
    }],
    "created": int(time.time()),
    "id": ai_message.id,
    "usage": ai_message.response_metadata.get('token_usage')
  }


def add_async_routes(app: FastAPI, path: str, runnable: Runnable):
  _router = APIRouter(prefix=path, tags=[path])

  @_router.post("ainvoke")
  async def ainvoke(input: runnable.input_schema):
    ai_message: AIMessage = await runnable.ainvoke(input.model_dump())
    return format_ai_message(ai_message)

  @_router.post("astream")
  async def astream(input: runnable.input_schema):  # 这里 input_schema 需确保已正确定义
    async def generator_function():
      # 初始化一个结构,用于构建最终返回的包含 choices 等的数据
      result_template = {
        "choices": [{"delta": {}, "index": 0}],
        "created": time.time(),
        "id": "",
        "usage": None
      }
      async for chunk in runnable.astream(input.model_dump()):
        # 更新 delta 里的内容
        result_template["choices"][0]["delta"]["content"] = chunk.content
        result_template["choices"][0]["delta"]["role"] = 'assistant'

        if chunk.response_metadata.get('finish_reason') is not None:
          result_template["choices"][0]["delta"]['finish_reason'] = chunk.response_metadata.get('finish_reason')

        result_template['id'] = chunk.id
        result_template['created'] = int(time.time())
        yield f"data: {json.dumps(result_template, ensure_ascii=False)}\n\n"
      yield "data: [DONE]\n\n"

    return StreamingResponse(generator_function(), media_type="text/event-stream")

  @_router.post("abatch")
  async def abatch(inputs: list[runnable.input_schema]):
    ai_message_list: List[AIMessage] = await runnable.abatch([input.model_dump() for input in inputs])
    return [format_ai_message(ai_message) for ai_message in ai_message_list]

  app.include_router(_router)

使用示例如下所示:

add_async_routes(
  app=app,
  runnable=RunnableLambda(lambda x: x['messages']) | create_llm().with_types(input_type=ModelInputSchema),
  path="/async_doubao")

add_async_routes(
  app=app,
  runnable=RunnableLambda(lambda x: x['messages']) | create_llm("huoshan-deepseek-r1").with_types(input_type=ModelInputSchema),
  path="/async_doubao_deepseek_r1")